Conversation
…up the stack but instead via onError
I re-read the MSDN docs and found the previous implementation wasn't complying with the contract. http://msdn.microsoft.com/en-us/library/hh211815(v=vs.103).aspx I believe this now does.
|
Updated in another commit to bring inline with the MSDN docs. Unit tests that show the behavior: @Test
public void testForEach() {
Observable.create(new AsyncObservable()).forEach({ result -> a.received(result)});
verify(a, times(1)).received(1);
verify(a, times(1)).received(2);
verify(a, times(1)).received(3);
}
@Test
public void testForEachWithError() {
try {
Observable.create(new AsyncObservable()).forEach({ result -> throw new RuntimeException('err')});
fail("we expect an exception to be thrown");
}catch(Exception e) {
// do nothing as we expect this
}
verify(a, times(0)).received(1);
verify(a, times(0)).received(2);
verify(a, times(0)).received(3);
} |
There was a problem hiding this comment.
Verifications could be removed since there are no usages of a in the forEach closure.
There was a problem hiding this comment.
Good point. Submitting a commit to remove those now.
|
Looks good to me. Only comment i would make is in the InterruptedException catch. Since not throwing a InterruptedException but Runtime, it might be good to also add Thread.getCurrentThread().interrupt() to set the interrupt flag again. |
|
Interesting point, didn't even think of that as I figured the RuntimeException would be sufficient to cause the caller to behave appropriately, but they could theoretically swallow RuntimeExceptions and continue in a while loop check for isInterrupted() which could be lost in this scenario. Making that change now. |
Operator: forEach
Issue #45
Related to Pull #131